package com.zenitera.bigdata.tableapiflinksql;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Flink07_FlinkSQL_OverWindow01 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("create table sensor(" +
                "id string," +
                "ts bigint," +
                "vc int," +
                "t as to_timestamp(from_unixtime(ts/1000,'yyyy-MM-dd HH:mm:ss'))," +
                "watermark for t as t - interval '5' second)" +
                "with(" +
                "'connector' = 'filesystem'," +
                "'path' = 'input/sensor.txt'," +
                "'format' = 'csv'" +
                ")");

        tableEnv.sqlQuery("select " +
                "id," +
                "vc," +
                "sum(vc) over(partition by id order by t rows between 1 PRECEDING and current row) vc_sum_value " +
                "from sensor")
                .execute()
                .print();

    }
}
/*
input/sensor.txt
sensor_1,1,10
sensor_1,2,20
sensor_2,4,30
sensor_1,4,400
sensor_2,5,50
sensor_2,6,60
#####################################################
+----+---------------+-------------+--------------+
| op |            id |          vc | vc_sum_value |
+----+---------------+-------------+--------------+
| +I |      sensor_1 |          10 |           10 |
| +I |      sensor_1 |          20 |           30 |
| +I |      sensor_1 |         400 |          420 |
| +I |      sensor_2 |          30 |           30 |
| +I |      sensor_2 |          50 |           80 |
| +I |      sensor_2 |          60 |          110 |
+----+---------------+-------------+--------------+
 */